Thomas Asikis, ETH, Fall 2019
This notebook aims to provide some simple examples of reinforcement learning based on Finite Markov Decision Processes. The exact code of each method is found in the relevant helper files that are in the same folder as this file!
# some code to show results side by side in the same cell output:
from IPython.display import display, HTML
CSS = """
.output {
flex-direction: row;
}
"""
HTML('<style>{}</style>'.format(CSS))
# some general libraries are loaded first!
%load_ext autoreload
%autoreload 2
import sys
sys.path.append("./")
import numpy as np
import pandas as pd
import random
from abc import abstractmethod
from IPython.display import IFrame, display, Image, Markdown
import matplotlib.pyplot as plt
import plotly as pl
from plotly import figure_factory as ff
from plotly import graph_objs as go
from plotly.offline import iplot, plot, init_notebook_mode
init_notebook_mode()
np.random.seed(1)
random.seed(1)
Remember that mix of shapes that represented the MDP in the slides. Well here it is again:

Essentially, in the code below we transform the above MDP into a fully functioning RL environment, just by changing some string representations to numerical ones!
from gym import Env
from gym.spaces import Discrete
class FiniteMDPEnv(Env):
def __init__(self, transitions: list, terminal_state, stable_baselines=False):
"""
Initialize a FiniteMDP environment based on a transition list and a terminal state
:param transitions:
:param terminal_state:
"""
self.stable_baselines = stable_baselines # for appropriate numeric transformations!
# generate transitions table
self.transitions = transitions
self.terminal_state = terminal_state
self.transitions_df = pd.DataFrame(transitions)
self.transitions_df = self.transitions_df[['from_state', 'action', 'to_state', 'prob',
'reward', 'reward_distro']]
self.transitions_df['cumul_prob'] = self.transitions_df.groupby(['from_state', 'action'])[
'prob'].cumsum()
# determine environment characteristics relevant to gym
# agent actions, sorted for consistency of action appearance through different transition
# implementations with same actions
self.possible_actions = sorted(self.transitions_df['action'].unique().tolist())
self.possible_states = set(self.transitions_df['from_state'].unique().tolist() +
self.transitions_df['to_state'].unique().tolist())
self.possible_states = sorted(list(self.possible_states))
self.action_space = Discrete(len(self.possible_actions))
self.state_space = Discrete(len(self.possible_states)) # current budget
self.observation_space = Discrete(len(self.possible_states))
# Variables to be initialized in reset
self.done = None
self.total_reward = None
self.time_step = None
self.current_state = None
self.reset()
def step(self, action):
"""
Applies an action to the current environment, advances the state, calculates reward and
determines if a terminal state is reached.
:param action: The action to be taken
:return: A tuple: The new state representation, the reward, a boolean denoting if
environment reached terminal state and finally a dictionary with meta information.
"""
if self.stable_baselines or isinstance(action, int) or isinstance(action, np.int64):
action = self.possible_actions[action]
transition_prob = np.random.random()
from_state_filt = self.transitions_df['from_state'] == self.current_state
action_filt = self.transitions_df['action'] == action
prob_filt = transition_prob <= self.transitions_df['cumul_prob']
transisition_filt = self.transitions_df[from_state_filt & action_filt & prob_filt]
current_transition = transisition_filt.loc[transisition_filt['cumul_prob'].idxmin()]
reward = current_transition['reward_distro'].sample_rew()
new_state = current_transition['to_state']
self.current_state = new_state
self.total_reward += reward
self.time_step += 1
self.done = new_state == self.terminal_state
# for gym, we convert the state to an integer.
if self.stable_baselines:
new_state = self.possible_states.index(new_state)
# the empty dictionary object below is used by gym to store any metadata or extra info
# you can use it like this as well, and store whatever extra info might be interesting for
# you! Still, all info relevant to the action choice from your agents needs to be somehow
# encoded in the state!
return new_state, reward, self.done, {}
def reset(self):
"""
Reset method, useful to reset the environment to an initial state so that a new episode
can be played.
:return:
"""
self.done = False
self.total_reward = 0
self.time_step = 0
self.current_state = np.random.choice(self.transitions_df['from_state'].unique())
return self.possible_states.index(self.current_state)
def render(self, mode='human'):
"""
Rendering the environment for an entity.
:param mode: several modes may be implemented. the env can be rendered e.g. in a way that is
"human" understandable.
:return: A visualization or Nothing if the visualization is rendedred internally.
In the current implementation we return the transition dataframe, the total reward and
the timesteps.
"""
return self.transitions_df, self.total_reward, self.time_step
# some utility functions
def get_possible_actions(self, from_state):
"""
Returns a list with possible actions, given a state
:param from_state:
:return:
"""
filt = self.transitions_df['from_state'] == from_state
return self.transitions_df[filt]['action'].unique().tolist()
def get_possible_states(self, from_state, action):
"""
State transitions from a current state and an action.
:param from_state: the current state
:param action: the action applies after observing the current state
:return: a dataframe with a "to_state" column and a probability "column" to transit to that
state.
"""
state_filt = self.transitions_df['from_state'] == from_state
action_filt = self.transitions_df['action'] == action
return self.transitions_df.loc[state_filt & action_filt, ['to_state', 'prob', 'reward']]
def define_transition(from_state, action, to_state, prob, reward_distro):
"""
A utility that creates a row of a transition used in the transition table.
:param from_state: A represenation that the agents can start from
:param action: A representation of an action
:param to_state: A representation of a state that an agent can go to
:param prob: The probability of transition, between
:param reward_distro: The object that generates rewards
:return:
"""
return dict(from_state=from_state,
to_state=to_state,
action=action,
prob=prob,
reward=reward_distro.value,
reward_distro=reward_distro
)
from utilities import DeterministicReward
# Now let's make the mdp of the
budget_transitions = [
define_transition('Low', 'Save', 'Medium', 1, DeterministicReward(1)),
define_transition('Low', 'Invest', 'Low', 0.9, DeterministicReward(0)),
define_transition('Low', 'Invest', 'High', 0.1, DeterministicReward(1)),
define_transition('Medium', 'Save', 'High', 0.2, DeterministicReward(1)),
define_transition('Medium', 'Save', 'Medium', 0.8, DeterministicReward(0)),
define_transition('Medium', 'Invest', 'High', 0.4, DeterministicReward(1)),
define_transition('Medium', 'Invest', 'Low', 0.6, DeterministicReward(-1)),
define_transition('High', 'Invest', 'Low', 0.3, DeterministicReward(-1)),
define_transition('High', 'Invest', 'Medium', 0.6, DeterministicReward(-1)),
define_transition('High', 'Invest', 'Unlimited', 0.1, DeterministicReward(100)),
define_transition('High', 'Save', 'High', 0.999, DeterministicReward(0)),
define_transition('High', 'Save', 'Unlimited', 0.001, DeterministicReward(100))]
invest_environment = FiniteMDPEnv(budget_transitions, 'Unlimited')
Now let's create the environment with stable baselines compatibility:
from stable_baselines.common.vec_env import DummyVecEnv
env = DummyVecEnv([lambda: FiniteMDPEnv(budget_transitions, 'Unlimited', stable_baselines=True)])
And now we create an Actor Critic model with a policy calculated via an MultiLayer Perceptron or "dense neural network" of 2 layers with 60 neurons each.
from stable_baselines import ACER
# using actor critic with experience replay
model = ACER(policy="MlpPolicy", env=env, n_steps=2, replay_ratio=1)
Now we train the model to learn for 5000 global timesteps. The environment is automatically reset and a new episode starts running after each time the environment.step returns done.
model.learn(total_timesteps=5000)
ac_rewards = []
ac_time = []
Now we play 50 episodes with our trained model predicting the moves based on the observed state.
for episode in range(50):
n_steps = 50
reward_sum = 0
obs = env.reset()
i =0
done = False
while i < n_steps and not done:
action, _ = model.predict(obs)
obs, reward, done, _ = env.step(action)
reward_sum += reward[0]
i+=1
ac_rewards.append(reward_sum)
ac_time.append(i)
From here on we repeat the steps from previous lecture to compare the Actor Critic policy performance with the previously tested policies.
env = DummyVecEnv([lambda: FiniteMDPEnv(budget_transitions, 'Unlimited', stable_baselines=True)])
from policies import ProbabilisticPolicy, any_deterministic_policy, any_esoft_policy
my_policy = ProbabilisticPolicy(invest_environment)
my_policy.set_probability('Low', 'Invest', 1)
my_policy.set_probability('Medium', 'Save', 1)
my_policy.set_probability('High', 'Save', 1)
my_policy.policy_table
my_policy.policy_table = my_policy.policy_table.rename_axis("My policy", axis=1)
my_policy.policy_table
#Dynamic programming
from dynamic_programming import ValueIteration, get_greedy_policy_table
#the old version of the environment with string representations
ie2 = FiniteMDPEnv(budget_transitions, 'Unlimited', stable_baselines=False)
vi = ValueIteration(ie2, renderer='svg')
value_estimates_dict = vi.estimate(plot_convergence=False)
vi_optimal_policy = get_greedy_policy_table(ie2, value_estimates_dict)
vi_optimal_policy = vi_optimal_policy.rename_axis('Value Iteration Policy', axis=1)
#Monte carlo
from monte_carlo import OnPolicyFirstVisitMC
mc = OnPolicyFirstVisitMC(ie2, epsilon=0.2)
mc_policy, mc_value_estimates = mc.estimate(max_episodes=5000//50)
my_timesteps_dist = []
my_reward_dist = []
for i in range(50):
exp = my_policy.play(ie2, max_steps=50)
timesteps = exp.shape[0]
my_timesteps_dist.append(timesteps)
my_reward_dist.append(ie2.total_reward)
opt_policy = ProbabilisticPolicy(ie2)
opt_policy.policy_table = vi_optimal_policy
opt_timesteps_dist = []
opt_reward_dist = []
for i in range(50):
exp = opt_policy.play(ie2, max_steps=50)
timesteps = exp.shape[0]
opt_timesteps_dist.append(timesteps)
opt_reward_dist.append(ie2.total_reward)
mcc_policy = ProbabilisticPolicy(ie2)
mcc_policy.policy_table = mc_policy
mc_timesteps_dist = []
mc_reward_dist = []
for i in range(50):
exp = mcc_policy.play(ie2, max_steps=50)
timesteps = exp.shape[0]
mc_timesteps_dist.append(timesteps)
mc_reward_dist.append(ie2.total_reward)
fig = ff.create_distplot([ac_rewards, my_reward_dist, opt_reward_dist, mc_reward_dist], ['actor-critic',
'mine',
'dynamic programming',
'monte-carlo'], show_hist=False)
fig.layout.xaxis.title = 'Total Timesteps'
fig.layout.yaxis.title = 'Density'
fig.show(renderer='notebook')
Comparing the Actor-Critic model with monte carlo and dynamic programming more or less yields the same total_reward distribution in a sample of 50 episodes!
Let's see now how changing the reward landscape actually changes the behavior of our algorithms, and actually may not help in reaching the target state.
Let's see now if we didn't have rewards that increase the closer we get to our goal. This means that higher rewards may be assigned to transitions that don't bring us closed to our goal.
budget_transitions = [
define_transition('Low', 'Save', 'Medium', 1, DeterministicReward(3)),
define_transition('Low', 'Invest', 'Low', 0.9, DeterministicReward(2)),
define_transition('Low', 'Invest', 'High', 0.1, DeterministicReward(1)),
define_transition('Medium', 'Save', 'High', 0.2, DeterministicReward(3)),
define_transition('Medium', 'Save', 'Medium', 0.8, DeterministicReward(2)),
define_transition('Medium', 'Invest', 'High', 0.4, DeterministicReward(2)),
define_transition('Medium', 'Invest', 'Low', 0.6, DeterministicReward(1)),
define_transition('High', 'Invest', 'Low', 0.3, DeterministicReward(2)),
define_transition('High', 'Invest', 'Medium', 0.6, DeterministicReward(1)),
define_transition('High', 'Invest', 'Unlimited', 0.1, DeterministicReward(1)),
define_transition('High', 'Save', 'High', 0.999, DeterministicReward(2)),
define_transition('High', 'Save', 'Unlimited', 0.001, DeterministicReward(1))]
nrenv = DummyVecEnv([lambda: FiniteMDPEnv(budget_transitions, 'Unlimited', stable_baselines=True)])
model = ACER(policy="MlpPolicy", env=nrenv, n_steps=2, replay_ratio=1)
model.learn(total_timesteps=5000)
ac_rewards_ur = []
ac_time_ur = []
for episode in range(50):
n_steps = 50
reward_sum = 0
obs = nrenv.reset()
i =0
done = False
while i < n_steps and not done:
action, _ = model.predict(obs)
obs, reward, done, _ = nrenv.step(action)
reward_sum += reward[0]
i+=1
ac_rewards_ur.append(reward_sum)
ac_time_ur.append(i)
fig = ff.create_distplot([ac_time, ac_time_ur], ['ac-initial-reward', 'ac-random-reward'], show_hist=False)
fig.show('notebook')
As we see the timestep distribution has a peak closer to the maximum limit (50) when rewards don't increase as we go closer to the goal. In general assigning higher rewards to irrelevant actions and states, can impact model performance a lot.